package com.iteaj.iot.test.mqtt;

import com.iteaj.iot.Message;
import com.iteaj.iot.client.ClientProtocolHandle;
import com.iteaj.iot.client.IotClientBootstrap;
import com.iteaj.iot.client.mqtt.MqttClient;
import com.iteaj.iot.client.mqtt.MqttConnectProperties;
import com.iteaj.iot.config.ConnectProperties;
import com.iteaj.iot.test.IotTestProperties;
import com.iteaj.iot.test.TestConst;
import com.iteaj.iot.test.TestProtocolType;
import com.iteaj.iot.test.client.fixed.FixedLengthClient;
import com.iteaj.iot.test.client.fixed.FixedLengthClientMessage;
import com.iteaj.iot.test.client.fixed.FixedLengthRequestProtocol;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.time.Instant;

/**
 * create time: 2021/9/3
 *
 * @author iteaj
 * @since 1.0
 */
public class MqttClientTestHandle implements ClientProtocolHandle<MqttPublishTestProtocol>, InitializingBean {

    @Autowired
    private ThreadPoolTaskScheduler scheduler;
    @Autowired
    private MqttClientTestComponent component;
    private Logger logger = LoggerFactory.getLogger(getClass());
    public static final String IotMqttWillClient = "IotMqttWillClient";

    @Override
    public Object handle(MqttPublishTestProtocol protocol) {
        return null;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        IotTestProperties.TestMqttConnectProperties config = (IotTestProperties.TestMqttConnectProperties) component.getConfig();


        scheduler.schedule(() -> {
            // 遗嘱测试
            if(StringUtils.hasText(config.getWillTopic())) {
                MqttConnectProperties willTopicConfig = new MqttConnectProperties();;
                // 创建一个遗嘱测试的客户端
                BeanUtils.copyProperties(config, willTopicConfig, "clientId");
                willTopicConfig.setAllIdleTime(0);
                willTopicConfig.setWriterIdleTime(0);
                willTopicConfig.setReaderIdleTime(0);
                willTopicConfig.setClientId(IotMqttWillClient);

                MqttClient client = component.createNewClient(willTopicConfig);
                client.init(IotClientBootstrap.clientGroup);
                client.connect(null, 1000);

                scheduler.schedule(() -> {
                    client.getChannel().close();
                    component.removeClient(willTopicConfig);
                }, Instant.now().plusSeconds(10));
            }

        }, Instant.now().plusSeconds(20));

        if(config.getNum() > 0) {
            scheduler.schedule(() -> {
                int clientNum = config.getNum();
                if(clientNum > 0) {
                    for(int i=0; i< clientNum; i++) {
                        MqttConnectProperties properties = new MqttConnectProperties();
                        BeanUtils.copyProperties(config, properties, "clientId");
                        properties.setAllIdleTime(0);
                        properties.setWriterIdleTime(0);
                        properties.setReaderIdleTime(0);

                        MqttClient client = component.createNewClient(properties);
                        client.init(IotClientBootstrap.clientGroup);
                        client.connect(null, 1000);
                    }
                }
            }, Instant.now().plusSeconds(20));

            scheduler.scheduleAtFixedRate(() -> {
                component.clients().forEach(item -> {
                    // 测试最多发送一次报文
                    new MqttPublishTestProtocol(MqttQoS.AT_MOST_ONCE, "most").timeout(0).request(((MqttClient) item).getConfig(), protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_MOST_ONCE
                                , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        return null;
                    });
                });
            }, Instant.now().plusSeconds(30), Duration.ofSeconds(1));
        } else {
            scheduler.scheduleAtFixedRate(() -> {
                try {

                    // 测试最多发送一次报文
                    new MqttPublishTestProtocol(MqttQoS.AT_MOST_ONCE, "most").timeout(0).request(protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_MOST_ONCE
                                , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        return null;
                    });

                    // 测试 client->broker->client->broker->client
                    new MqttPublishTestProtocol(MqttQoS.AT_MOST_ONCE, "broker").request(protocol -> {
                        final Message.MessageHead head = protocol.responseMessage().getHead();
                        logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", "broker主动请求测试"
                                , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        return null;
                    });
                    Thread.sleep(1000);

                    // 最少发送一次报文
                    new MqttPublishTestProtocol(MqttQoS.AT_LEAST_ONCE, "least").request(protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.AT_LEAST_ONCE
                                , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        return null;
                    });
                    Thread.sleep(2000);

                    // 确保一定发送一次
                    new MqttPublishTestProtocol(MqttQoS.EXACTLY_ONCE, "exactly").request(protocol -> {
                        final Message.MessageHead head = protocol.requestMessage().getHead();
                        logger.info(TestConst.LOGGER_PROTOCOL_DESC, "mqtt协议", MqttQoS.EXACTLY_ONCE
                                , head.getEquipCode(), head.getMessageId(), protocol.getExecStatus().desc);
                        return null;
                    });
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }


            }, Instant.now().plusSeconds(18), Duration.ofSeconds(30));
        }
    }
}
